#include <string>

#include "envoy/common/platform.h"
#include "envoy/config/cluster/v3/cluster.pb.h"

#include "source/common/common/macros.h"
#include "source/common/tcp_proxy/tcp_proxy.h"

#include "test/common/grpc/grpc_client_integration.h"
#include "test/integration/ads_integration.h"
#include "test/integration/fake_upstream.h"
#include "test/integration/integration.h"
#include "test/test_common/resources.h"
#include "test/test_common/utility.h"

#include "gtest/gtest.h"

namespace Envoy {
namespace {

class TcpProxyOdcdsIntegrationTest : public Grpc::GrpcClientIntegrationParamTest,
                                     public BaseIntegrationTest {
public:
  TcpProxyOdcdsIntegrationTest()
      : BaseIntegrationTest(std::get<0>(GetParam()), ConfigHelper::tcpProxyConfig()) {
    // The test envoy uses static listener and cluster for xDS.
    // The test framework does not update the above static resources.
    // Another upstream will be serving on-demand CDS requests and the response is explicitly
    // generated by test case rather than the integration test framework.
    use_lds_ = false;
    enableHalfClose(true);
  }
  void TearDown() override { cleanUpXdsConnection(); }

  void initialize() override {
    config_helper_.addConfigModifier([&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
      // Set xds_cluster.
      auto* static_resources = bootstrap.mutable_static_resources();
      ASSERT(static_resources->clusters_size() == 1);
      auto& cluster_protocol_options =
          *static_resources->mutable_clusters(0)->mutable_typed_extension_protocol_options();
      envoy::extensions::upstreams::http::v3::HttpProtocolOptions h2_options;
      h2_options.mutable_explicit_http_config()->mutable_http2_protocol_options();
      cluster_protocol_options["envoy.extensions.upstreams.http.v3.HttpProtocolOptions"].PackFrom(
          h2_options);

      // Add on demand config to tcp_proxy config.
      ASSERT(static_resources->listeners_size() == 1);
      auto* config_blob = static_resources->mutable_listeners(0)
                              ->mutable_filter_chains(0)
                              ->mutable_filters(0)
                              ->mutable_typed_config();

      ASSERT_TRUE(config_blob->Is<envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy>());
      auto tcp_proxy_config =
          MessageUtil::anyConvert<envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy>(
              *config_blob);
      tcp_proxy_config.set_cluster("new_cluster");
      tcp_proxy_config.mutable_on_demand()->CopyFrom(
          TestUtility::parseYaml<
              envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_OnDemand>(R"EOF(
          odcds_config:
            api_config_source:
              api_type: DELTA_GRPC
              grpc_services:
                envoy_grpc:
                  cluster_name: cluster_0
      )EOF"));
      tcp_proxy_config.mutable_on_demand()->mutable_timeout()->set_seconds(
          std::chrono::duration_cast<std::chrono::seconds>(odcds_timeout_).count());
      config_blob->PackFrom(tcp_proxy_config);
    });

    // The first upstream serves the ADS request including the future on demand CDS request.
    setUpstreamCount(1);
    setUpstreamProtocol(FakeHttpConnection::Type::HTTP2);

    BaseIntegrationTest::initialize();

    // HTTP protocol version is not used because tcp stream is expected.
    addFakeUpstream(FakeHttpConnection::Type::HTTP2);

    new_cluster_ = ConfigHelper::buildStaticCluster(
        "new_cluster", fake_upstreams_.back()->localAddress()->ip()->port(),
        Network::Test::getLoopbackAddressString(ipVersion()));

    test_server_->waitUntilListenersReady();
    registerTestServerPorts({"tcp_proxy"});
  }
  void setUpShortTimeout() { odcds_timeout_ = std::chrono::milliseconds(1000); }

  ::testing::AssertionResult assertOnDemandCounters(uint64_t success, uint64_t missing,
                                                    uint64_t timeout) {
    auto success_counter = test_server_->counter("tcp.tcpproxy_stats.on_demand_cluster_success");
    auto missing_counter = test_server_->counter("tcp.tcpproxy_stats.on_demand_cluster_missing");
    auto timeout_counter = test_server_->counter("tcp.tcpproxy_stats.on_demand_cluster_timeout");
    if (success_counter && success_counter->value() == success && missing_counter &&
        missing_counter->value() == missing && timeout_counter &&
        timeout_counter->value() == timeout) {
      return ::testing::AssertionSuccess();
    } else {
      auto to_string = [](Stats::CounterSharedPtr& counter) {
        if (counter == nullptr) {
          return absl::StrCat("not exist");
        } else {
          return absl::StrCat(counter->value());
        }
      };
      return ::testing::AssertionFailure()
             << fmt::format("success {} vs {}, missing {} vs {}, timeout {} vs {}", success,
                            to_string(success_counter), missing, to_string(missing_counter),
                            timeout, to_string(timeout_counter));
    }
  }

  // The on demand CDS stream. The requested cluster config is delivered to Envoy in this stream.
  FakeStreamPtr odcds_stream_;
  std::chrono::milliseconds odcds_timeout_{60000};
  // The prepared cluster config response.
  envoy::config::cluster::v3::Cluster new_cluster_;
  std::vector<FakeRawConnectionPtr> fake_upstream_connections_;
  std::vector<IntegrationTcpClientPtr> tcp_clients_;
};

INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, TcpProxyOdcdsIntegrationTest,
                         GRPC_CLIENT_INTEGRATION_PARAMS);

TEST_P(TcpProxyOdcdsIntegrationTest, SingleTcpClient) {
  initialize();

  // Establish a tcp request to the Envoy.
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));

  // The on-demand CDS stream is established.
  auto result = fake_upstreams_.front()->waitForHttpConnection(*dispatcher_, xds_connection_);
  RELEASE_ASSERT(result, result.message());
  result = xds_connection_->waitForNewStream(*dispatcher_, odcds_stream_);
  RELEASE_ASSERT(result, result.message());
  odcds_stream_->startGrpcStream();
  test_server_->waitForCounterEq("tcp.tcpproxy_stats.on_demand_cluster_attempt", 1);
  // Verify the on-demand CDS request and respond with the prepared `new_cluster`.
  EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {"new_cluster"}, {},
                                           odcds_stream_.get()));
  // The on demand cluster request is received and the response is not sent. The tcp proxy must not
  ASSERT_TRUE(fake_upstreams_.back()->assertPendingConnectionsEmpty());

  sendDeltaDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
      Config::TypeUrl::get().Cluster, {new_cluster_}, {}, "1", odcds_stream_.get());
  EXPECT_TRUE(
      compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {}, {}, odcds_stream_.get()));

  // This upstream is listening on the endpoint of `new_cluster`. It starts to serve tcp_proxy.
  FakeRawConnectionPtr fake_upstream_connection;
  ASSERT_TRUE(fake_upstreams_.back()->waitForRawConnection(fake_upstream_connection));

  ASSERT_TRUE(fake_upstream_connection->write("hello"));
  tcp_client->waitForData("hello");

  ASSERT_TRUE(tcp_client->write("world"));
  ASSERT_TRUE(fake_upstream_connection->waitForData(5));

  ASSERT_TRUE(fake_upstream_connection->close());
  ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
  tcp_client->waitForHalfClose();
  tcp_client->close();
  assertOnDemandCounters(1, 0, 0);
}

// Verify only one delta xds response is needed for multiple tcp_proxy requests.
TEST_P(TcpProxyOdcdsIntegrationTest, RepeatedRequest) {
  uint32_t expected_upstream_connections = 10;
  initialize();

  // Establish tcp connections to the target Envoy.
  for (auto n = expected_upstream_connections; n != 0; n--) {
    tcp_clients_.push_back(makeTcpConnection(lookupPort("tcp_proxy")));
  }

  // The on-demand CDS stream is established.
  auto result = fake_upstreams_.front()->waitForHttpConnection(*dispatcher_, xds_connection_);
  RELEASE_ASSERT(result, result.message());
  result = xds_connection_->waitForNewStream(*dispatcher_, odcds_stream_);
  RELEASE_ASSERT(result, result.message());
  odcds_stream_->startGrpcStream();

  // Verify the on-demand CDS request and respond without providing the cluster.
  EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {"new_cluster"}, {},
                                           odcds_stream_.get()));
  sendDeltaDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
      Config::TypeUrl::get().Cluster, {new_cluster_}, {}, "1", odcds_stream_.get());
  EXPECT_TRUE(
      compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {}, {}, odcds_stream_.get()));

  test_server_->waitForCounterEq("tcp.tcpproxy_stats.on_demand_cluster_attempt",
                                 expected_upstream_connections);

  // This upstream is listening on the endpoint of `new_cluster`.
  for (auto n = expected_upstream_connections; n != 0; n--) {
    fake_upstream_connections_.push_back(nullptr);
    auto& fake_upstream_connection = fake_upstream_connections_.back();
    ASSERT_TRUE(fake_upstreams_.back()->waitForRawConnection(fake_upstream_connection));
    ASSERT_TRUE(fake_upstream_connection->write("hello"));
  }

  for (auto& tcp_client : tcp_clients_) {
    tcp_client->waitForData("hello");
    ASSERT_TRUE(tcp_client->write("world"));
  }

  for (auto& fake_upstream_connection : fake_upstream_connections_) {
    ASSERT_TRUE(fake_upstream_connection->waitForData(5));
    ASSERT_TRUE(fake_upstream_connection->close());
    ASSERT_TRUE(fake_upstream_connection->waitForDisconnect());
  }

  for (auto& tcp_client : tcp_clients_) {
    // Ignore spurious events as any client can close in any order.
    tcp_client->waitForHalfClose(true);
    tcp_client->close();
  }

  auto success_value =
      test_server_->counter("tcp.tcpproxy_stats.on_demand_cluster_success")->value();
  // The cluster look up occurs at least once and at most `expected_upstream_connections` times.
  EXPECT_GE(expected_upstream_connections, success_value);
  EXPECT_LE(1, success_value);
  EXPECT_EQ(0, test_server_->counter("tcp.tcpproxy_stats.on_demand_cluster_missing")->value());
  EXPECT_EQ(0, test_server_->counter("tcp.tcpproxy_stats.on_demand_cluster_timeout")->value());
}

TEST_P(TcpProxyOdcdsIntegrationTest, ShutdownConnectionOnTimeout) {
  setUpShortTimeout();
  initialize();

  // Establish a tcp request to the Envoy.
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));

  // The on-demand CDS stream is established.
  auto result = fake_upstreams_.front()->waitForHttpConnection(*dispatcher_, xds_connection_);
  RELEASE_ASSERT(result, result.message());
  result = xds_connection_->waitForNewStream(*dispatcher_, odcds_stream_);
  RELEASE_ASSERT(result, result.message());
  odcds_stream_->startGrpcStream();

  // Verify the on-demand CDS request and respond without providing the cluster.
  EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {"new_cluster"}, {},
                                           odcds_stream_.get()));
  EXPECT_EQ(1, test_server_->counter("tcp.tcpproxy_stats.on_demand_cluster_attempt")->value());

  tcp_client->waitForHalfClose();
  tcp_client->close();
  assertOnDemandCounters(0, 0, 1);
}

TEST_P(TcpProxyOdcdsIntegrationTest, ShutdownConnectionOnClusterMissing) {
  initialize();

  // Establish a tcp request to the Envoy.
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));

  // The on-demand CDS stream is established.
  auto result = fake_upstreams_.front()->waitForHttpConnection(*dispatcher_, xds_connection_);
  RELEASE_ASSERT(result, result.message());
  result = xds_connection_->waitForNewStream(*dispatcher_, odcds_stream_);
  RELEASE_ASSERT(result, result.message());
  odcds_stream_->startGrpcStream();

  // Verify the on-demand CDS request and respond the required cluster is missing.
  EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {"new_cluster"}, {},
                                           odcds_stream_.get()));
  sendDeltaDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
      Config::TypeUrl::get().Cluster, {}, {"new_cluster"}, "1", odcds_stream_.get());
  EXPECT_TRUE(
      compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {}, {}, odcds_stream_.get()));

  EXPECT_EQ(1, test_server_->counter("tcp.tcpproxy_stats.on_demand_cluster_attempt")->value());

  tcp_client->waitForHalfClose();
  tcp_client->close();
  assertOnDemandCounters(0, 1, 0);
}

TEST_P(TcpProxyOdcdsIntegrationTest, ShutdownAllConnectionsOnClusterLookupTimeout) {
  setUpShortTimeout();
  initialize();

  // Establish a tcp request to the Envoy.
  IntegrationTcpClientPtr tcp_client_1 = makeTcpConnection(lookupPort("tcp_proxy"));

  // The on-demand CDS stream is established.
  auto result = fake_upstreams_.front()->waitForHttpConnection(*dispatcher_, xds_connection_);
  RELEASE_ASSERT(result, result.message());
  result = xds_connection_->waitForNewStream(*dispatcher_, odcds_stream_);
  RELEASE_ASSERT(result, result.message());
  odcds_stream_->startGrpcStream();

  // Verify the on-demand CDS request and respond without providing the cluster.
  EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {"new_cluster"}, {},
                                           odcds_stream_.get()));

  EXPECT_EQ(1, test_server_->counter("tcp.tcpproxy_stats.on_demand_cluster_attempt")->value());

  // TODO: assert there is no more on-demand cds request since the first cluster is in flight.
  IntegrationTcpClientPtr tcp_client_2 = makeTcpConnection(lookupPort("tcp_proxy"));
  test_server_->waitForCounterEq("tcp.tcpproxy_stats.on_demand_cluster_attempt", 2);

  tcp_client_1->waitForHalfClose(true);
  tcp_client_2->waitForHalfClose(true);
  assertOnDemandCounters(0, 0, 2);
  tcp_client_1->close();
  tcp_client_2->close();
}

// Verify the tcp proxy filter can handle the client close while waiting for the cds response.
TEST_P(TcpProxyOdcdsIntegrationTest, ShutdownTcpClientBeforeOdcdsResponse) {
  initialize();

  // Establish a tcp request to the Envoy.
  IntegrationTcpClientPtr tcp_client = makeTcpConnection(lookupPort("tcp_proxy"));

  // The on-demand CDS stream is established.
  auto result = fake_upstreams_.front()->waitForHttpConnection(*dispatcher_, xds_connection_);
  RELEASE_ASSERT(result, result.message());
  result = xds_connection_->waitForNewStream(*dispatcher_, odcds_stream_);
  RELEASE_ASSERT(result, result.message());
  odcds_stream_->startGrpcStream();

  // Verify the on-demand CDS request and stall the response before tcp client close.
  EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().Cluster, {"new_cluster"}, {},
                                           odcds_stream_.get()));
  EXPECT_EQ(1, test_server_->counter("tcp.tcpproxy_stats.on_demand_cluster_attempt")->value());
  // Client disconnect when the tcp proxy is waiting for the on demand response.
  tcp_client->close();
  assertOnDemandCounters(0, 0, 0);
}

} // namespace
} // namespace Envoy
